Source code for hysop.numerics.fft.scipy_fft

# Copyright (c) HySoP 2011-2024
#
# This file is part of HySoP software.
# See "https://particle_methods.gricad-pages.univ-grenoble-alpes.fr/hysop-doc/"
# for further info.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


"""
FFT iterface for fast Fourier Transforms using scipy fftpack backend (using scipy).
:class:`~hysop.numerics.ScipyFFT`
:class:`~hysop.numerics.ScipyFFTPlan`
"""

import numpy as np
import scipy as sp
from scipy import fftpack as _FFT

from hysop.tools.htypes import first_not_None
from hysop.numerics.fft.host_fft import HostFFTPlanI, HostFFTI, HostArray
from hysop.numerics.fft.fft import (
    complex_to_float_dtype,
    float_to_complex_dtype,
    mk_shape,
    mk_view,
)


[docs] class ScipyFFTPlan(HostFFTPlanI): """ Wrap a scipy fftpack call (scipy.fftpack does not offer real planning capabilities). """ def __init__(self, fn, x, out, axis, scaling=None, **kwds): super().__init__() self.fn = fn self.x = x self.out = out self.axis = axis self.scaling = scaling kwds = kwds.copy() kwds["x"] = x kwds["axis"] = axis self.kwds = kwds @property def input_array(self): return self.x @property def output_array(self): return self.out
[docs] def execute(self): fn = self.fn fn_kwds = self.kwds.copy() if fn is _FFT.irfft: assert "n" in fn_kwds x = self.x out = self.out if isinstance(x, HostArray): x = x.handle if isinstance(out, HostArray): out = out.handle axis = self.axis mkv = lambda *args, **kwds: mk_view(x.ndim, axis, *args, **kwds) if fn is _FFT.irfft: try: x = x.view(dtype=complex_to_float_dtype(x.dtype)) except ValueError: x = x.copy().view(dtype=complex_to_float_dtype(x.dtype)) is_even = fn_kwds["n"] % 2 == 0 rshape = list(x.shape) rshape[axis] -= 1 + is_even _in = np.empty(shape=rshape, dtype=x.dtype) _in[mkv(1, None)] = x[mkv(2, x.shape[axis] - is_even)] _in[mkv(0)] = x[mkv(0)] fn_kwds["x"] = _in # custom implementation of DCT-I and DST-I # default scipy FFTPACK implementation has optimal complexity # but O(sqrt(N)) error. if fn in (_FFT.dct, _FFT.idct) and fn_kwds["type"] == 1: N = x.shape[axis] shape = x.shape ndim = x.ndim slc0 = mk_view(ndim, axis, 1, -1) slc1 = mk_view(ndim, axis, None, None, -1) slc2 = mk_view(ndim, axis, 2, N, None) slc3 = mk_view(ndim, axis, 3, None, 2) slc4 = mk_view(ndim, axis, None, N, None) s0 = mk_shape(shape, axis, 2 * N - 2) X = np.empty(shape=s0, dtype=x.dtype) np.concatenate((x, x[slc0][slc1]), axis=axis, out=X) res = _FFT.rfft(X, axis=axis) res[slc2] = res[slc3] res = res[slc4] elif fn in (_FFT.dst, _FFT.idst) and fn_kwds["type"] == 1: N = x.shape[axis] shape = x.shape ndim = x.ndim slc0 = mk_view(ndim, axis, None, None, -1) slc1 = mk_view(ndim, axis, 2, None, 2) s0 = mk_shape(shape, axis, 2 * N + 2) s1 = mk_shape(shape, axis, 1) X = np.empty(shape=s0, dtype=x.dtype) Z = np.zeros(shape=s1, dtype=x.dtype) np.concatenate((Z, -x, Z, x[slc0]), axis=axis, out=X) res = _FFT.rfft(X, axis=axis)[slc1] else: res = fn(**fn_kwds) if fn is _FFT.rfft: assert axis in (x.ndim - 1, -1) is_even = x.shape[axis] % 2 == 0 rshape = list(res.shape) rshape[axis] += 1 + is_even if (out is None) or (not out.flags.c_contiguous): real_output = out out = np.empty(dtype=x.dtype, shape=rshape) else: rtype = complex_to_float_dtype(out.dtype) real_output = None out = out.view(dtype=rtype) assert np.array_equal(out.shape, rshape) ctype = float_to_complex_dtype(out.dtype) out[mkv(1, out.shape[axis] - is_even)] = res out[mkv(0)] = out[mkv(1)] out[mkv(1)] = 0.0 if is_even: out[mkv(-1)] = 0.0 out = out.view(dtype=ctype) if real_output is not None: real_output[...] = out out = real_output elif out is not None: out[...] = res else: out = res scaling = self.scaling if scaling is not None: out[...] *= scaling return out
[docs] class ScipyFFT(HostFFTI): """ Interface to compute local to process FFT-like transforms using the scipy fftpack backend. Scipy fftpack backend has some disadvantages: - creation of one or two intermediate temporary buffers at each call - hermitian-complex data layout is different than all other fft implementations - no planning capabilities (scipy.fftpack methods are just wrapped into fake plans) It has native float and double precision support unlike the numpy fft backend. Planning won't destroy original inputs. """ def __init__(self, backend=None, allocator=None, warn_on_allocation=True, **kwds): super().__init__( backend=backend, allocator=allocator, warn_on_allocation=warn_on_allocation, **kwds, ) self.supported_ftypes = ( np.float32, np.float64, ) self.supported_ctypes = ( np.complex64, np.complex128, )
[docs] def fft(self, a, out=None, axis=-1, **kwds): (shape, dtype) = super().fft(a=a, out=out, axis=axis, **kwds) out = self.allocate_output(out, shape, dtype) plan = ScipyFFTPlan(fn=_FFT.fft, x=a, out=out, axis=axis, **kwds) return plan
[docs] def ifft(self, a, out=None, axis=-1, **kwds): (shape, dtype, s) = super().ifft(a=a, out=out, axis=axis, **kwds) out = self.allocate_output(out, shape, dtype) plan = ScipyFFTPlan(fn=_FFT.ifft, x=a, out=out, axis=axis, **kwds) return plan
[docs] def rfft(self, a, out=None, axis=-1, **kwds): (shape, dtype) = super().rfft(a=a, out=out, axis=axis, **kwds) out = self.allocate_output(out, shape, dtype) plan = ScipyFFTPlan(fn=_FFT.rfft, x=a, out=out, axis=axis, **kwds) return plan
[docs] def irfft(self, a, out=None, n=None, axis=-1, **kwds): (shape, dtype, s) = super().irfft(a=a, out=out, n=n, axis=axis, **kwds) out = self.allocate_output(out, shape, dtype) plan = ScipyFFTPlan( fn=_FFT.irfft, x=a, out=out, axis=axis, n=shape[axis], **kwds ) return plan
[docs] def dct(self, a, out=None, type=2, axis=-1, **kwds): (shape, dtype) = super().dct(a=a, out=out, type=type, axis=axis, **kwds) out = self.allocate_output(out, shape, dtype) plan = ScipyFFTPlan(fn=_FFT.dct, x=a, out=out, axis=axis, type=type, **kwds) return plan
[docs] def idct(self, a, out=None, type=2, axis=-1, scaling=None, **kwds): (shape, dtype, _, s) = super().idct(a=a, out=out, type=type, axis=axis, **kwds) out = self.allocate_output(out, shape, dtype) plan = ScipyFFTPlan( fn=_FFT.idct, x=a, out=out, axis=axis, type=type, scaling=first_not_None(scaling, 1.0 / s), **kwds, ) return plan
[docs] def dst(self, a, out=None, type=2, axis=-1, **kwds): (shape, dtype) = super().dst(a=a, out=out, type=type, axis=axis, **kwds) out = self.allocate_output(out, shape, dtype) plan = ScipyFFTPlan(fn=_FFT.dst, x=a, out=out, axis=axis, type=type, **kwds) return plan
[docs] def idst(self, a, out=None, type=2, axis=-1, scaling=None, **kwds): (shape, dtype, _, s) = super().idst(a=a, out=out, type=type, axis=axis, **kwds) out = self.allocate_output(out, shape, dtype) plan = ScipyFFTPlan( fn=_FFT.idst, x=a, out=out, axis=axis, type=type, scaling=first_not_None(scaling, 1.0 / s), **kwds, ) return plan